-
Notifications
You must be signed in to change notification settings - Fork 14
feat(database): Add unified MongoDB and PostgreSQL dual-write mechanism #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…zation - Added comprehensive documentation for the dual-write feature in README.md - Updated requirements.txt to include psycopg2-binary for PostgreSQL support - Modified Django settings to configure PostgreSQL as the primary database - Introduced dual-write operations with error handling and monitoring capabilities - Enhanced project structure for future migration paths and Docker development setup
…vice transformation
…for enhanced functionality
…te service for task assignments and user roles
…assignment deletion process
…data handling and enhance dual-write service integration
…odes and update watchlist collection naming
…or labels and roles
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 98
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (15)
todo/repositories/abstract_repository.py (1)
184-201
: AbstractAuditLogRepository interface mismatches PostgresAuditLog fieldsThe concrete Postgres implementation in
todo/repositories/postgres_repository.py
invokes filters on fields that don’t exist on thePostgresAuditLog
model:
get_by_user(...)
filters on user_mongo_id, but the Django model defines performed_by instead.get_by_collection(...)
filters on collection_name, yet no such column exists onPostgresAuditLog
.As-is, these calls will raise runtime errors. To resolve:
- Rename and adjust
get_by_user
toget_by_performed_by
(and filter onperformed_by
).- Either remove or redefine
get_by_collection
to target a valid field (e.g.task_id
orteam_id
), or update the model to include the intended column.- Update the abstract interface (
AbstractAuditLogRepository
) to match the revised method names and signatures.Please make these critical fixes before merging.
todo/services/task_assignment_service.py (1)
63-73
: Guard against creation failures; exclusivity is already enforced
After calling
TaskAssignmentRepository.create(...)
, add a check to surface failures instead of returning a response with missing identifiers:assignment = TaskAssignmentRepository.create(task_assignment) if not assignment: raise ValueError("Failed to create task assignment")The repository’s
update_assignment
method (in todo/repositories/task_assignment_repository.py, lines 90–109) already deactivates any existing active assignments—first by matching onObjectId(task_id)
, then on the stringtask_id
—before creating the new assignment, ensuring that a task cannot remain assigned to both a team and an individual simultaneously.todo/repositories/team_creation_invite_code_repository.py (2)
31-35
: Critical: find_one_and_update returns the pre-update doc due to wrong return_document valuePyMongo expects ReturnDocument.BEFORE/AFTER, not a boolean. Passing True will not yield the updated document and will likely return the pre-update version, breaking the dual-write payload and the method’s return value.
Apply this diff:
- result = collection.find_one_and_update( + result = collection.find_one_and_update( {"code": code, "is_used": False}, - {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time.isoformat()}}, - return_document=True, + {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time}}, + return_document=ReturnDocument.AFTER, )And add the missing import at the top of the file:
+from pymongo import ReturnDocument
122-131
: Consistent API shape for created_by/used_by in list responsecreated_by falls back to {}, while used_by is None when unavailable. This asymmetry complicates clients. Recommend returning None for both when absent.
- enhanced_code = { + enhanced_code = { "id": str(code["_id"]), "code": code["code"], "description": code.get("description"), "created_at": code.get("created_at"), "used_at": code.get("used_at"), "is_used": code.get("is_used", False), - "created_by": created_by_user or {}, - "used_by": used_by_user, + "created_by": created_by_user or None, + "used_by": used_by_user or None, }todo/repositories/team_repository.py (3)
80-106
: Missing Postgres dual-write on team updateThe create path dual-writes, but update doesn’t. Postgres will drift from Mongo. Add an update_document call after a successful Mongo update.
updated_doc = teams_collection.find_one_and_update( {"_id": ObjectId(team_id), "is_deleted": False}, {"$set": update_data}, return_document=ReturnDocument.AFTER, ) - if updated_doc: - return TeamModel(**updated_doc) + if updated_doc: + # Sync update to Postgres + dual_write_service = EnhancedDualWriteService() + team_data = { + "name": updated_doc.get("name"), + "description": updated_doc.get("description"), + "invite_code": updated_doc.get("invite_code"), + "poc_id": str(updated_doc.get("poc_id")) if updated_doc.get("poc_id") else None, + "created_by": str(updated_doc.get("created_by")), + "updated_by": str(updated_doc.get("updated_by")), + "is_deleted": updated_doc.get("is_deleted", False), + "created_at": updated_doc.get("created_at"), + "updated_at": updated_doc.get("updated_at"), + } + dual_write_success = dual_write_service.update_document( + collection_name="teams", mongo_id=str(updated_doc["_id"]), data=team_data + ) + if not dual_write_success: + logger.warning(f"Failed to sync team update {updated_doc['_id']} to Postgres") + return TeamModel(**updated_doc)Ensure a module-level logger is available (see separate comment).
332-347
: Reactivation path returns stale data and skips Postgres syncWhen reactivating an existing relationship, you update Mongo but return the pre-update document and don’t sync Postgres. Use find_one_and_update with ReturnDocument.AFTER and call update_document.
- if not existing_relationship.get("is_active", True): - collection.update_one( - {"_id": existing_relationship["_id"]}, - { - "$set": { - "is_active": True, - "role_id": role_id, - "updated_by": created_by_user_id, - "updated_at": datetime.now(timezone.utc), - } - }, - ) - return UserTeamDetailsModel(**existing_relationship) + if not existing_relationship.get("is_active", True): + updated = collection.find_one_and_update( + {"_id": existing_relationship["_id"]}, + { + "$set": { + "is_active": True, + "role_id": role_id, + "updated_by": created_by_user_id, + "updated_at": datetime.now(timezone.utc), + } + }, + return_document=ReturnDocument.AFTER, + ) + if updated: + dual_write_service = EnhancedDualWriteService() + payload = { + "user_id": str(updated["user_id"]), + "team_id": str(updated["team_id"]), + "is_active": True, + "created_by": str(updated["created_by"]), + "updated_by": str(updated["updated_by"]), + "created_at": updated["created_at"], + "updated_at": updated["updated_at"], + } + if not dual_write_service.update_document( + collection_name="user_team_details", mongo_id=str(updated["_id"]), data=payload + ): + logger.warning(f"Failed to sync user team reactivation {updated['_id']} to Postgres") + return UserTeamDetailsModel(**updated) + return UserTeamDetailsModel(**existing_relationship)
280-296
: Optional: collapse read-then-update into a single atomic operationYou read the active relationship, then update it. This is two round trips and susceptible to races. Consider find_one_and_update with ReturnDocument.BEFORE/AFTER to both assert is_active and get the prior doc in one call (you already did this pattern elsewhere).
If you want, I can draft a patch to switch remove_user_from_team to a single atomic call with the same dual-write behavior.
todo/repositories/watchlist_repository.py (3)
28-36
: Normalize ID string conversions for model initializationYou convert updatedBy to str when present, but not createdBy. Be consistent to avoid Pydantic coercion surprises.
if doc: # Convert ObjectId fields to strings for the model - if "updatedBy" in doc and doc["updatedBy"]: + if "updatedBy" in doc and doc["updatedBy"]: doc["updatedBy"] = str(doc["updatedBy"]) + if "createdBy" in doc and doc["createdBy"]: + doc["createdBy"] = str(doc["createdBy"]) return WatchlistModel(**doc)
317-325
: Inconsistent type for updatedBy: store as str for consistencyElsewhere IDs are stored as strings in watchlists. Here you’re writing an ObjectId. Standardize on strings to match queries and pipelines that rely on $toObjectId later.
update_result = watchlist_collection.update_one( {"userId": str(userId), "taskId": str(taskId)}, { "$set": { "isActive": isActive, "updatedAt": datetime.now(timezone.utc), - "updatedBy": userId, + "updatedBy": str(userId), } }, )
307-353
: Refactorupdate()
return annotation in watchlist_repository.pyThe
update
method is currently declared as returning adict
but actually returns apymongo.results.UpdateResult
orNone
. Please adjust the signature and imports as follows:• File:
todo/repositories/watchlist_repository.py
• Location: method definition on line ~307Suggested diff:
-from datetime import datetime, timezone -from typing import List, Tuple -from typing import Optional +from datetime import datetime, timezone +from typing import List, Tuple, Optional +from pymongo.results import UpdateResult class WatchlistRepository(MongoRepository): @@ - @classmethod - def update(cls, taskId: ObjectId, isActive: bool, userId: ObjectId) -> dict: + @classmethod + def update(cls, taskId: ObjectId, isActive: bool, userId: ObjectId) -> Optional[UpdateResult]: """ Update the watchlist status of a task. """
- Import
UpdateResult
frompymongo.results
so the annotation resolves at runtime.- Use
Optional[UpdateResult]
to accurately reflect that the method may returnNone
.todo/repositories/task_repository.py (1)
269-293
: Soft-delete is not dual-written; Postgres gets out of sync.delete_by_id marks isDeleted=True in Mongo but never syncs to Postgres. Add a dual-write update to keep Postgres consistent.
deleted_task_data = tasks_collection.find_one_and_update( {"_id": task_id}, { "$set": { "isDeleted": True, "updatedAt": datetime.now(timezone.utc), "updatedBy": user_id, } }, return_document=ReturnDocument.AFTER, ) - if deleted_task_data: - return TaskModel(**deleted_task_data) + if deleted_task_data: + # Dual-write soft delete to Postgres + try: + dual_write_service = EnhancedDualWriteService() + dw_payload = { + "isDeleted": True, + "updatedAt": deleted_task_data["updatedAt"], + "updatedBy": str(user_id), + } + dual_write_service.update_document( + collection_name="tasks", + mongo_id=str(task_id), + data=dw_payload, + ) + except Exception: + # Best-effort: do not block Mongo success + import logging + logging.getLogger(__name__).warning(f"Failed to sync task soft-delete {task_id} to Postgres") + return TaskModel(**deleted_task_data) return Nonetodo/repositories/task_assignment_repository.py (2)
158-166
: Potential crash: PyObjectId(None) for team_id when user_type != "team".Constructing PyObjectId with None will raise. Guard the conversion.
new_assignment = TaskAssignmentModel( _id=PyObjectId(), task_id=PyObjectId(task_id), assignee_id=PyObjectId(assignee_id), user_type=user_type, created_by=PyObjectId(user_id), updated_by=None, - team_id=PyObjectId(team_id), + team_id=PyObjectId(team_id) if team_id else None, )
243-266
: Type drift: storing assignee_id/updated_by as strings; use ObjectId consistently.Mixed types in Mongo will break queries and necessitate fallbacks. Keep ObjectId for both update paths.
result = collection.update_one( {"task_id": ObjectId(task_id), "is_active": True}, { "$set": { - "assignee_id": executor_id, + "assignee_id": ObjectId(executor_id), "user_type": "user", - "updated_by": user_id, + "updated_by": ObjectId(user_id), "updated_at": datetime.now(timezone.utc), } }, ) if result.modified_count == 0: # Try with string if ObjectId doesn't work result = collection.update_one( {"task_id": task_id, "is_active": True}, { "$set": { - "assignee_id": executor_id, + "assignee_id": ObjectId(executor_id), "user_type": "user", - "updated_by": user_id, + "updated_by": ObjectId(user_id), "updated_at": datetime.now(timezone.utc), } }, )docker-compose.yml (1)
93-95
: Connect mongo-express to the replica setThe Mongo container runs with --replSet rs0. Point mongo-express at the replica set URI for correctness.
Apply this diff:
- ME_CONFIG_MONGODB_URL: mongodb://db:27017/ + ME_CONFIG_MONGODB_URL: mongodb://db:27017/?replicaSet=rs0todo_project/settings/base.py (1)
239-239
: Safe default for CORS_ALLOWED_ORIGINSAccessing os.getenv("CORS_ALLOWED_ORIGINS").split(",") will raise when the env var is unset. Provide a safe default.
Apply this diff:
-CORS_ALLOWED_ORIGINS = os.getenv("CORS_ALLOWED_ORIGINS").split(",") +_cors = os.getenv("CORS_ALLOWED_ORIGINS", "") +CORS_ALLOWED_ORIGINS = [o for o in _cors.split(",") if o] if _cors else []
♻️ Duplicate comments (1)
todo/repositories/task_assignment_repository.py (1)
90-131
: Honor “exclusive assignment” rule from past learnings during update.Per retrieved learnings, updates should ensure exclusivity by deactivating all active assignments before creating a new one. You do two update_many calls (ObjectId + string), which is correct; ensure no race by reading with majority write concern if available.
Would you like me to add a transactional pattern (session with retry) around both updates and the subsequent create to minimize race windows?
…r dual-write and sync services
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review continued from previous batch...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review continued from previous batch...
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Summary by CodeRabbit
WalkthroughAdds PostgreSQL integration: models, initial migration, repositories, services for dual-write from MongoDB, and sync utilities. Updates settings, docker-compose, and requirements. Introduces management command and initialization sync. Provides extensive documentation for the dual-write system. Minor README formatting and small service import/order tweaks. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Repo as Mongo Repository
participant Mongo as MongoDB
participant DWS as EnhancedDualWriteService
participant PG as Postgres (Django ORM)
rect rgb(235,245,255)
note over Client,PG: Create flow (example: Task)
Client->>Repo: create(payload)
Repo->>Mongo: insertOne(payload)
Mongo-->>Repo: insertedId
Repo->>DWS: create_document("tasks", mapped, mongo_id)
alt Dual-write enabled
DWS->>PG: ORM create/related sync
PG-->>DWS: success/failure
DWS-->>Repo: bool
else Disabled
DWS-->>Repo: skipped (true)
end
Repo-->>Client: TaskModel
end
sequenceDiagram
autonumber
participant Repo as Mongo Repository
participant Mongo as MongoDB
participant DWS as EnhancedDualWriteService
participant PG as Postgres (Django ORM)
rect rgb(245,235,255)
note over Repo,PG: Update with deferred details
Repo->>Mongo: updateOne(filter, update)
Mongo-->>Repo: modifiedCount
alt modifiedCount > 0
Repo->>DWS: update_document("tasks", mongo_id, mapped)
DWS->>PG: ORM update
PG-->>DWS: ok
opt deferredDetails present
Repo->>PG: upsert PostgresDeferredDetails
PG-->>Repo: ok
end
end
end
sequenceDiagram
autonumber
participant Django as Django Init
participant PSS as PostgresSyncService
participant Mongo as MongoDB
participant PG as Postgres
rect rgb(235,255,245)
note over Django,PG: App startup sync (labels, roles)
Django->>PSS: sync_all_tables()
PSS->>PG: check table exists
PSS->>Mongo: count collections
PSS->>PG: count tables
alt needs sync
PSS->>Mongo: iterate documents
loop docs
PSS->>PG: create if not exists (by mongo_id)
end
else skip
PSS-->>Django: no-op
end
PSS-->>Django: boolean result
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update( | ||
status="REJECTED", # Mark as rejected instead of deleting | ||
updated_at=timezone.now(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a schema mismatch in the _sync_task_assignment_update
method. The code is attempting to update a status
field to "REJECTED", but according to the PostgresTaskAssignment
model definition in this PR, this model doesn't have a status
field. Instead, it uses an is_active
boolean field for tracking assignment state.
The update should be modified to:
PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update(
is_active=False, # Mark as inactive instead of using status
updated_at=timezone.now(),
)
This would align with the model's schema and maintain consistency with how assignment state is tracked throughout the application.
PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update( | |
status="REJECTED", # Mark as rejected instead of deleting | |
updated_at=timezone.now(), | |
PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update( | |
is_active=False, # Mark as inactive instead of using status | |
updated_at=timezone.now(), |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 34
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (8)
todo/services/task_assignment_service.py (5)
46-53
: Make audit “unassign” logging best-effort (don’t fail the main flow) and de-risk typosRecommendation:
- Wrap audit log writes in try/except so logging failures don’t break assignment updates.
- Centralize action names as constants or an Enum to avoid string typos.
- if existing_assignment.user_type == "team": - AuditLogRepository.create( - AuditLogModel( - task_id=existing_assignment.task_id, - team_id=existing_assignment.assignee_id, - action="unassigned_from_team", - performed_by=PyObjectId(user_id), - ) - ) + if existing_assignment.user_type == "team": + try: + AuditLogRepository.create( + AuditLogModel( + task_id=existing_assignment.task_id, + team_id=existing_assignment.assignee_id, + action="unassigned_from_team", # TODO: replace with AuditAction.UNASSIGNED_FROM_TEAM + performed_by=PyObjectId(user_id), + ) + ) + except Exception: + # TODO: replace with project logger + # logger.exception("Failed to write audit log for team unassignment") + pass
75-83
: Apply the same best-effort pattern to “assigned_to_team” logging and consider action constantsMirror the try/except used for unassign logs and avoid raw string actions.
- if assignment.user_type == "team": - AuditLogRepository.create( - AuditLogModel( - task_id=assignment.task_id, - team_id=assignment.assignee_id, - action="assigned_to_team", - performed_by=PyObjectId(user_id), - ) - ) + if assignment.user_type == "team": + try: + AuditLogRepository.create( + AuditLogModel( + task_id=assignment.task_id, + team_id=assignment.assignee_id, + action="assigned_to_team", # TODO: replace with AuditAction.ASSIGNED_TO_TEAM + performed_by=PyObjectId(user_id), + ) + ) + except Exception: + # logger.exception("Failed to write audit log for team assignment") + pass
85-97
: Remove or track the commented “legacy” blockCommented code risks rot. Either:
- Remove it, or
- Add a TODO with an issue reference/owner, or
- Hide behind a feature flag if it must linger.
41-60
: Ensure TaskAssignment dual-write enforces single active assignment and atomic consistencyThe current implementation in
TaskAssignmentRepository.update_assignment
does not appear to:
- Deactivate any prior active assignments for that task in Mongo (
is_active = false
) before marking the new one active.- Mirror that deactivation in Postgres (and the subsequent new assignment insert/update) within the same logical operation.
- Enforce via a database constraint that only one “active” assignment per task can exist in Postgres.
Mandatory fixes:
- In
TaskAssignmentRepository.update_assignment
(around todo/repositories/task_assignment_repository.py:90–120), add an explicit deactivation step that setsis_active = False
on all existing assignments for the giventask_id
in Mongo, then create/update the new assignment record (and use the dual-write service to sync both the deactivation and the new activation to Postgres).- In your Django migrations (e.g. todo/migrations/0001_initial_setup.py), add a partial unique index on
postgres_task_assignments
over(task_mongo_id)
whereis_active = true
, or aunique_together
+CheckConstraint
equivalent, to enforce “one active assignment per task” at the database level.- Introduce idempotency support on your dual-write paths (e.g. outbox table entries or idempotency keys in
EnhancedDualWriteService
) so that retries don’t create duplicate or conflicting records.- Document the write ordering and failure-recovery strategy in docs/DUAL_WRITE_SYSTEM.md (e.g., “Mongo deactivation → PG deactivation → Mongo creation → PG creation with transaction boundaries, backed by an outbox that retries on failure”).
35-39
: Add and map TeamNotFoundException for consistent 404 handlingThe codebase currently lacks a
TeamNotFoundException
and a corresponding API error message, so replacing theValueError
won’t work until we introduce and wire up a new exception type. Please:
- Create the exception
+ todo/exceptions/team_exceptions.py +------------------------------- + from todo.constants.messages import ApiErrors + + class TeamNotFoundException(Exception): + def __init__(self, team_id: str | None = None, message_template: str = ApiErrors.TEAM_NOT_FOUND): + if team_id: + message = message_template.format(team_id) + else: + message = message_template + super().__init__(message)- Add the “team not found” API error constant
todo/constants/messages.py --------------------------- class ApiErrors: USER_NOT_FOUND = "User not found: {}" TASK_NOT_FOUND = "Task not found: {}" + TEAM_NOT_FOUND = "Team not found: {}"
- Wire it into the global exception handler
todo/exceptions/exception_handler.py ------------------------------------ elif isinstance(exc, TaskNotFoundException): status_code = status.HTTP_404_NOT_FOUND error_list.append(ApiErrorDetail(detail=str(exc))) + elif isinstance(exc, TeamNotFoundException): + status_code = status.HTTP_404_NOT_FOUND + error_list.append(ApiErrorDetail(detail=str(exc)))- Update the service to raise the new exception
todo/services/task_assignment_service.py ---------------------------------------- elif dto.user_type == "team": assignee = TeamRepository.get_by_id(dto.assignee_id) if not assignee: - raise ValueError(f"Team not found: {dto.assignee_id}") + raise TeamNotFoundException(dto.assignee_id)- Don’t forget to add:
from todo.exceptions.team_exceptions import TeamNotFoundExceptionThese changes will ensure missing‐team errors use a domain‐specific exception and return HTTP 404 in line with users/tasks.
todo/repositories/team_creation_invite_code_repository.py (2)
31-35
: Bug: return_document must be ReturnDocument.AFTER, not TruePyMongo requires the ReturnDocument enum; passing True will raise a TypeError at runtime.
+from pymongo.collection import ReturnDocument @@ - result = collection.find_one_and_update( + result = collection.find_one_and_update( {"code": code, "is_used": False}, - {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time.isoformat()}}, - return_document=True, + {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time}}, + return_document=ReturnDocument.AFTER, )
31-35
: Ensure used_by is stored as ObjectId in Mongo; normalize created_at for dual-writeKeep Mongo types consistent (ObjectId) and send normalized datetimes to Postgres to avoid serialization errors.
from datetime import datetime, timezone +from bson import ObjectId @@ - result = collection.find_one_and_update( + # normalize used_by to ObjectId where possible + try: + used_by_value = ObjectId(used_by) if used_by else None + except Exception: + used_by_value = used_by # fallback to existing behavior + result = collection.find_one_and_update( {"code": code, "is_used": False}, - {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time}}, + {"$set": {"is_used": True, "used_by": used_by_value, "used_at": current_time}}, return_document=ReturnDocument.AFTER, ) @@ - invite_code_data = { + # normalize created_at (legacy records may be strings) + _created_at = result.get("created_at") + if isinstance(_created_at, str): + _created_at = datetime.fromisoformat(_created_at.replace("Z", "+00:00")) + invite_code_data = { "code": result["code"], "description": result.get("description"), "is_used": True, "created_by": str(result["created_by"]), - "used_by": str(used_by), - "created_at": result.get("created_at"), + "used_by": str(used_by) if used_by else None, + "created_at": _created_at, "used_at": current_time, }Also applies to: 41-48
todo/repositories/user_repository.py (1)
1-12
: Use a module-level logger for dual-write warningsAvoid inline imports; define once and reuse.
from datetime import datetime, timezone from typing import Optional, List from pymongo.collection import ReturnDocument from pymongo import ASCENDING +import logging +logger = logging.getLogger(__name__) @@ - if not dual_write_success: - import logging - - logger = logging.getLogger(__name__) - logger.warning(f"Failed to sync user {user_model.id} to Postgres") + if not dual_write_success: + logger.warning(f"Failed to sync user {user_model.id} to Postgres")Also applies to: 87-92
@@ -1 +1 @@ | |||
# Added this because without this file Django isn't able to auto detect the test files | |||
# Django project initialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Prefer a module docstring over a comment for introspection and tooling
Switching to a docstring makes the message accessible via todo_project.__doc__
and improves consistency with Python conventions.
-# Django project initialization
+"""Django project initialization."""
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Django project initialization | |
"""Django project initialization.""" |
🤖 Prompt for AI Agents
In todo_project/__init__.py around lines 1 to 1, replace the top-line comment
with a module-level docstring so the description becomes available via
todo_project.__doc__; change the single-line comment into a quoted string
literal (triple quotes if multi-line) as the first statement in the file to
follow Python conventions and enable introspection and tooling to read the
module doc.
"REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Env var mismatch for refresh token lifetime between TESTING and non-TESTING
Non-testing uses REFRESH_TOKEN_LIFETIME (Line 141) while TESTING uses REFRESH_LIFETIME (Line 133). This creates confusing behavior and broken overrides in CI. Standardize on one key.
Apply this diff for the non-testing block (and mirror it in TESTING, see below):
- "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")),
+ "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")),
Update the TESTING block outside the shown range:
- "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_LIFETIME", "604800")),
+ "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")),
🤖 Prompt for AI Agents
In todo_project/settings/base.py around lines 141-142, the non-TESTING config
uses REFRESH_TOKEN_LIFETIME while the TESTING block uses REFRESH_LIFETIME (line
~133); standardize to REFRESH_LIFETIME by replacing REFRESH_TOKEN_LIFETIME with
REFRESH_LIFETIME in this non-TESTING block and then mirror the same key and
getenv usage in the TESTING block so both environments read the same env var
name and default value.
name="PostgresTaskAssignment", | ||
fields=[ | ||
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), | ||
("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), | ||
("task_mongo_id", models.CharField(max_length=24)), | ||
("assignee_id", models.CharField(max_length=24)), | ||
( | ||
"user_type", | ||
models.CharField( | ||
choices=[("user", "User"), ("team", "Team")], | ||
max_length=10, | ||
), | ||
), | ||
("team_id", models.CharField(blank=True, max_length=24, null=True)), | ||
("is_active", models.BooleanField(default=True)), | ||
("created_at", models.DateTimeField(default=django.utils.timezone.now)), | ||
("updated_at", models.DateTimeField(blank=True, null=True)), | ||
("created_by", models.CharField(max_length=24)), | ||
("updated_by", models.CharField(blank=True, max_length=24, null=True)), | ||
("last_sync_at", models.DateTimeField(auto_now=True)), | ||
( | ||
"sync_status", | ||
models.CharField( | ||
choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], | ||
default="SYNCED", | ||
max_length=20, | ||
), | ||
), | ||
("sync_error", models.TextField(blank=True, null=True)), | ||
], | ||
options={ | ||
"db_table": "postgres_task_assignments", | ||
"indexes": [ | ||
models.Index(fields=["mongo_id"], name="postgres_ta_mongo_i_326fa9_idx"), | ||
models.Index(fields=["task_mongo_id"], name="postgres_ta_task_mo_95ca3b_idx"), | ||
models.Index(fields=["assignee_id"], name="postgres_ta_assignee_95ca3b_idx"), | ||
models.Index(fields=["user_type"], name="postgres_ta_user_typ_d13fa3_idx"), | ||
models.Index(fields=["team_id"], name="postgres_ta_team_id_a0605f_idx"), | ||
models.Index(fields=["is_active"], name="postgres_ta_is_acti_8b9698_idx"), | ||
models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"), | ||
], | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add unique active-assignment constraint (+ team_id check) to PostgresTaskAssignment
Mirrors model-level constraints to protect data integrity at the DB layer.
migrations.CreateModel(
name="PostgresTaskAssignment",
fields=[
@@
],
options={
"db_table": "postgres_task_assignments",
"indexes": [
@@
models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"),
],
+ "constraints": [
+ models.UniqueConstraint(
+ fields=("task_mongo_id", "assignee_id", "user_type"),
+ condition=models.Q(("is_active", True)),
+ name="uniq_active_assignment_per_assignee",
+ ),
+ models.CheckConstraint(
+ check=~models.Q(("user_type", "team"), ("team_id__isnull", True)),
+ name="team_assignment_requires_team_id",
+ ),
+ ],
},
),
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
name="PostgresTaskAssignment", | |
fields=[ | |
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), | |
("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), | |
("task_mongo_id", models.CharField(max_length=24)), | |
("assignee_id", models.CharField(max_length=24)), | |
( | |
"user_type", | |
models.CharField( | |
choices=[("user", "User"), ("team", "Team")], | |
max_length=10, | |
), | |
), | |
("team_id", models.CharField(blank=True, max_length=24, null=True)), | |
("is_active", models.BooleanField(default=True)), | |
("created_at", models.DateTimeField(default=django.utils.timezone.now)), | |
("updated_at", models.DateTimeField(blank=True, null=True)), | |
("created_by", models.CharField(max_length=24)), | |
("updated_by", models.CharField(blank=True, max_length=24, null=True)), | |
("last_sync_at", models.DateTimeField(auto_now=True)), | |
( | |
"sync_status", | |
models.CharField( | |
choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], | |
default="SYNCED", | |
max_length=20, | |
), | |
), | |
("sync_error", models.TextField(blank=True, null=True)), | |
], | |
options={ | |
"db_table": "postgres_task_assignments", | |
"indexes": [ | |
models.Index(fields=["mongo_id"], name="postgres_ta_mongo_i_326fa9_idx"), | |
models.Index(fields=["task_mongo_id"], name="postgres_ta_task_mo_95ca3b_idx"), | |
models.Index(fields=["assignee_id"], name="postgres_ta_assignee_95ca3b_idx"), | |
models.Index(fields=["user_type"], name="postgres_ta_user_typ_d13fa3_idx"), | |
models.Index(fields=["team_id"], name="postgres_ta_team_id_a0605f_idx"), | |
models.Index(fields=["is_active"], name="postgres_ta_is_acti_8b9698_idx"), | |
models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"), | |
], | |
}, | |
migrations.CreateModel( | |
name="PostgresTaskAssignment", | |
fields=[ | |
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), | |
("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), | |
("task_mongo_id", models.CharField(max_length=24)), | |
("assignee_id", models.CharField(max_length=24)), | |
( | |
"user_type", | |
models.CharField( | |
choices=[("user", "User"), ("team", "Team")], | |
max_length=10, | |
), | |
), | |
("team_id", models.CharField(blank=True, max_length=24, null=True)), | |
("is_active", models.BooleanField(default=True)), | |
("created_at", models.DateTimeField(default=django.utils.timezone.now)), | |
("updated_at", models.DateTimeField(blank=True, null=True)), | |
("created_by", models.CharField(max_length=24)), | |
("updated_by", models.CharField(blank=True, max_length=24, null=True)), | |
("last_sync_at", models.DateTimeField(auto_now=True)), | |
( | |
"sync_status", | |
models.CharField( | |
choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], | |
default="SYNCED", | |
max_length=20, | |
), | |
), | |
("sync_error", models.TextField(blank=True, null=True)), | |
], | |
options={ | |
"db_table": "postgres_task_assignments", | |
"indexes": [ | |
models.Index(fields=["mongo_id"], name="postgres_ta_mongo_i_326fa9_idx"), | |
models.Index(fields=["task_mongo_id"], name="postgres_ta_task_mo_95ca3b_idx"), | |
models.Index(fields=["assignee_id"], name="postgres_ta_assignee_95ca3b_idx"), | |
models.Index(fields=["user_type"], name="postgres_ta_user_typ_d13fa3_idx"), | |
models.Index(fields=["team_id"], name="postgres_ta_team_id_a0605f_idx"), | |
models.Index(fields=["is_active"], name="postgres_ta_is_acti_8b9698_idx"), | |
models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"), | |
], | |
"constraints": [ | |
models.UniqueConstraint( | |
fields=("task_mongo_id", "assignee_id", "user_type"), | |
condition=models.Q(("is_active", True)), | |
name="uniq_active_assignment_per_assignee", | |
), | |
models.CheckConstraint( | |
check=~models.Q(("user_type", "team"), ("team_id__isnull", True)), | |
name="team_assignment_requires_team_id", | |
), | |
], | |
}, | |
), |
🤖 Prompt for AI Agents
In todo/migrations/0001_initial_setup.py around lines 178 to 219, the DB-level
unique constraint for active assignments is missing; add a UniqueConstraint that
enforces uniqueness of (task_mongo_id, assignee_id, user_type, team_id) only
when is_active is True. Update the migration to include a UniqueConstraint (e.g.
models.UniqueConstraint(fields=["task_mongo_id","assignee_id","user_type","team_id"],
condition=models.Q(is_active=True), name="postgres_ta_active_unique_...")) in
the model options (or as an AddConstraint operation), and ensure you reference
models.Q in the migration so the partial (conditional) uniqueness is applied and
the team_id check is included.
@classmethod | ||
def get_by_user_and_team(cls, user_id: str, team_id: str): | ||
collection = cls.get_collection() | ||
try: | ||
user_id_obj = ObjectId(user_id) | ||
except Exception: | ||
user_id_obj = user_id | ||
try: | ||
team_id_obj = ObjectId(team_id) | ||
except Exception: | ||
team_id_obj = team_id | ||
|
||
queries = [ | ||
{"user_id": user_id_obj, "team_id": team_id_obj}, | ||
{"user_id": user_id, "team_id": team_id_obj}, | ||
{"user_id": user_id_obj, "team_id": team_id}, | ||
{"user_id": user_id, "team_id": team_id}, | ||
] | ||
|
||
for query in queries: | ||
result = collection.find_one(query) | ||
if result: | ||
return result | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Add a compound index on (user_id, team_id) in Mongo
Frequent lookups by this pair (with $in) will otherwise scan the collection. Add a migration/init to create an index: [("user_id", 1), ("team_id", 1)].
🤖 Prompt for AI Agents
In todo/repositories/user_team_details_repository.py around lines 9 to 32, the
repository performs frequent lookups by (user_id, team_id) and needs a compound
index to avoid collection scans; add a DB migration or initialization step that
calls the collection.create_index with the compound key [("user_id", 1),
("team_id", 1)] (set background=True so it doesn't block, and consider
unique=True only if the data model guarantees one document per (user_id,
team_id)); ensure the migration runs at service startup or part of your
migrations pipeline so the index exists before heavy queries run.
except postgres_model.DoesNotExist: | ||
# Document doesn't exist in Postgres, create it | ||
return self.create_document(collection_name, data, mongo_id) | ||
except Exception as e: | ||
error_msg = f"Failed to update {collection_name}:{mongo_id} in Postgres: {str(e)}" | ||
logger.error(error_msg) | ||
self._record_sync_failure(collection_name, mongo_id, error_msg) | ||
return False | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Catch ObjectDoesNotExist instead of model-scoped DoesNotExist
Avoids referencing postgres_model in except and works uniformly across models.
+from django.core.exceptions import ObjectDoesNotExist
@@
- except postgres_model.DoesNotExist:
+ except ObjectDoesNotExist:
# Document doesn't exist in Postgres, create it
return self.create_document(collection_name, data, mongo_id)
@@
- except postgres_model.DoesNotExist:
+ except ObjectDoesNotExist:
logger.warning(f"Document {collection_name}:{mongo_id} not found in Postgres for deletion")
return True # Consider this a success since the goal is achieved
Also applies to: 176-183
🤖 Prompt for AI Agents
In todo/services/dual_write_service.py around lines 135-143, the except clause
currently catches postgres_model.DoesNotExist; replace that with the generic
ObjectDoesNotExist (imported from django.core.exceptions) so the handler works
uniformly across models; update the import at the top of the file to include
ObjectDoesNotExist and change the other similar block at lines 176-183 to use
ObjectDoesNotExist as well, leaving the other Exception catch and error logging
unchanged.
def _transform_team_data(self, data: Dict[str, Any]) -> Dict[str, Any]: | ||
"""Transform team data for Postgres.""" | ||
return { | ||
"name": data.get("name"), | ||
"description": data.get("description"), | ||
"invite_code": data.get("invite_code"), | ||
"poc_id": str(data.get("poc_id", "")) if data.get("poc_id") else None, | ||
"created_by": str(data.get("created_by", "")), | ||
"updated_by": str(data.get("updated_by", "")), | ||
"is_deleted": data.get("is_deleted", False), | ||
"created_at": data.get("created_at"), | ||
"updated_at": data.get("updated_at"), | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Inconsistent transform input casing for teams (snake vs camel)
Team transform expects snake_case (invite_code, created_by) while tasks/labels use camelCase (createdAt). Standardize or accept both to reduce surprises.
I can patch _transform_team_data to read both variants, e.g.:
return {
"name": data.get("name"),
"description": data.get("description"),
"invite_code": data.get("invite_code") or data.get("inviteCode"),
"poc_id": str(data.get("poc_id") or data.get("pocId")) if (data.get("poc_id") or data.get("pocId")) else None,
"created_by": str(data.get("created_by") or data.get("createdBy", "")),
"updated_by": str(data.get("updated_by") or data.get("updatedBy", "")),
...
}
Want me to apply this pattern consistently across all transforms?
🤖 Prompt for AI Agents
In todo/services/dual_write_service.py around lines 266 to 279, the team
transform only reads snake_case keys while other transforms accept camelCase,
causing inconsistencies; update _transform_team_data to accept both snake_case
and camelCase for each field (e.g., invite_code or inviteCode, poc_id or pocId,
created_by or createdBy, updated_by or updatedBy, created_at or createdAt,
updated_at or updatedAt), prefer the snake_case value when present, convert
poc_id/created_by/updated_by to strings with the same None/default logic as now,
and apply this same dual-key pattern consistently to the other transform
functions so all transforms accept both casings.
def get_sync_metrics(self) -> Dict[str, Any]: | ||
""" | ||
Get metrics about sync operations. | ||
Returns: | ||
Dict: Sync metrics | ||
""" | ||
try: | ||
metrics = { | ||
"total_failures": len(self.sync_failures), | ||
"failures_by_collection": {}, | ||
"recent_failures": self.sync_failures[-10:] if self.sync_failures else [], | ||
"enabled": self.enabled, | ||
} | ||
|
||
# Count failures by collection | ||
for failure in self.sync_failures: | ||
collection = failure["collection"] | ||
if collection not in metrics["failures_by_collection"]: | ||
metrics["failures_by_collection"][collection] = 0 | ||
metrics["failures_by_collection"][collection] += 1 | ||
|
||
return metrics | ||
except Exception as e: | ||
logger.error(f"Error getting sync metrics: {str(e)}") | ||
return {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Sanitize recent_failures in metrics to avoid leaking payloads
If failures later store operation/data, do not expose raw data in metrics. Return a redacted subset.
- metrics = {
- "total_failures": len(self.sync_failures),
- "failures_by_collection": {},
- "recent_failures": self.sync_failures[-10:] if self.sync_failures else [],
- "enabled": self.enabled,
- }
+ def _redact(fr):
+ keys = ("collection", "mongo_id", "error", "timestamp", "operation")
+ return {k: fr.get(k) for k in keys if k in fr}
+ metrics = {
+ "total_failures": len(self.sync_failures),
+ "failures_by_collection": {},
+ "recent_failures": [_redact(fr) for fr in (self.sync_failures[-10:] if self.sync_failures else [])],
+ "enabled": self.enabled,
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_sync_metrics(self) -> Dict[str, Any]: | |
""" | |
Get metrics about sync operations. | |
Returns: | |
Dict: Sync metrics | |
""" | |
try: | |
metrics = { | |
"total_failures": len(self.sync_failures), | |
"failures_by_collection": {}, | |
"recent_failures": self.sync_failures[-10:] if self.sync_failures else [], | |
"enabled": self.enabled, | |
} | |
# Count failures by collection | |
for failure in self.sync_failures: | |
collection = failure["collection"] | |
if collection not in metrics["failures_by_collection"]: | |
metrics["failures_by_collection"][collection] = 0 | |
metrics["failures_by_collection"][collection] += 1 | |
return metrics | |
except Exception as e: | |
logger.error(f"Error getting sync metrics: {str(e)}") | |
return {} | |
def get_sync_metrics(self) -> Dict[str, Any]: | |
""" | |
Get metrics about sync operations. | |
Returns: | |
Dict: Sync metrics | |
""" | |
try: | |
# Redact failure details to avoid exposing full payloads | |
def _redact(fr): | |
keys = ("collection", "mongo_id", "error", "timestamp", "operation") | |
return {k: fr.get(k) for k in keys if k in fr} | |
metrics = { | |
"total_failures": len(self.sync_failures), | |
"failures_by_collection": {}, | |
"recent_failures": [ | |
_redact(fr) | |
for fr in (self.sync_failures[-10:] if self.sync_failures else []) | |
], | |
"enabled": self.enabled, | |
} | |
# Count failures by collection | |
for failure in self.sync_failures: | |
collection = failure["collection"] | |
if collection not in metrics["failures_by_collection"]: | |
metrics["failures_by_collection"][collection] = 0 | |
metrics["failures_by_collection"][collection] += 1 | |
return metrics | |
except Exception as e: | |
logger.error(f"Error getting sync metrics: {str(e)}") | |
return {} |
🤖 Prompt for AI Agents
In todo/services/enhanced_dual_write_service.py around lines 119 to 145, the
current get_sync_metrics returns recent_failures containing full failure objects
which can leak operation/data; update the function to redact recent_failures by
mapping each failure to a safe subset (e.g., timestamp, collection, error
message/summary, and an optional non-sensitive identifier or hashed id) and
explicitly omit any fields like "operation", "data", "payload" or other raw
content; implement this by building a small helper or inline mapping that
picks/derives only the allowed keys for the last 10 failures before returning
metrics, and keep logging unchanged for exceptions.
with connection.cursor() as cursor: | ||
cursor.execute( | ||
""" | ||
SELECT EXISTS ( | ||
SELECT FROM information_schema.tables | ||
WHERE table_schema = 'public' | ||
AND table_name = %s | ||
); | ||
""", | ||
[table_name], | ||
) | ||
return cursor.fetchone()[0] | ||
except Exception as e: | ||
logger.error(f"Error checking if table {table_name} exists: {str(e)}") | ||
return False | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Error handling and logging are good; consider wrapping per-table operations in a transaction
Optional: Surround bulk creates with transaction.atomic() to ensure atomicity per table.
Also applies to: 110-127
🤖 Prompt for AI Agents
In todo/services/postgres_sync_service.py around lines 70-85 (also apply same
change to 110-127), per-table bulk create/check operations aren’t wrapped in a
transaction; surround each table's bulk create/update block with a
transaction.atomic() context so that all DB changes for that table are committed
or rolled back together, ensuring atomicity and preventing partial writes — add
the transaction.atomic() context manager around the per-table operation and move
existing cursor/execute logic inside it.
…n docker-compose and application settings
a87e888
to
d185900
Compare
Date: 25 Aug 2025
Developer Name: @AnujChhikara
Issue Ticket Number
Description
Documentation Updated?
Under Feature Flag
Database Changes
Breaking Changes
Development Tested?
Screenshots
Screenshot 1
Test Coverage
Screenshot 1
Additional Notes
Description by Korbit AI
What change is being made?
Introduce a dual-write mechanism to the codebase to synchronize data between MongoDB and PostgreSQL, including corresponding data models, service classes, repositories, migration files, and configuration updates.
Why are these changes being made?
The changes aim to facilitate a future migration from MongoDB to PostgreSQL by ensuring data consistency and reliability across both databases. This approach minimizes operational risks and enables a systematic transition through dual-writing and eventual read migration. The synchronization is monitored with appropriate error handling, retries, and system metrics to maintain data integrity and performance.